package com.cctv.cndms.common.consumer.queue;

import com.cctv.cndms.common.consumer.Message;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @category 消费者队列
 * @author heyingcheng
 * @email heyingcheng@ctvit.com.cn
 * @date 2018/4/23 1:17
 */
public class ConsumerBlockingQueue {

    /** @category 队列容量 */
    public static final int QUEUE_CAPACITY = 50000;
    /** @category 阻塞队列 */
    private BlockingQueue<Message> messageQueue = null;
    /** @category 阻塞队列 */
    private Set<String> messageIdSet = null;
    /** @category 阻塞队列 */
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    /** @category 阻塞队列 */
    private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    /** @category 阻塞队列 */
    private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

    public ConsumerBlockingQueue() {
        messageQueue = new ArrayBlockingQueue<Message>(QUEUE_CAPACITY);
        messageIdSet = new HashSet<String>();
    }

    /**
     * @category 添加消息
     * @param message
     * @author heyingcheng
     * @date 2018/4/23 1:35
     * @return void
     */
    public void put(Message message) throws InterruptedException {
        writeLock.lock();
        try {
            if (messageIdSet.contains(message.getMessageId())) {
                return;
            }
            messageQueue.put(message);
            messageIdSet.add(message.getMessageId());
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * @category 获取消息
     * @param
     * @author heyingcheng
     * @date 2018/4/23 1:42
     * @return com.cctv.entity.cmsMessage.Message
     */
    public Message take() throws InterruptedException {
        writeLock.lockInterruptibly();
        try {
            return messageQueue.take();
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * @category 移除消息
     * @param message
     * @author heyingcheng
     * @date 2018/4/23 1:50
     * @return boolean
     */
    public boolean remove(Message message) {
        writeLock.lock();
        try {
            messageIdSet.remove(message.getMessageId());
            return messageQueue.remove(message);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * @category 获取消息ID列表
     * @param
     * @author heyingcheng
     * @date 2018/4/23 1:38
     * @return int
     */
    public Set<String> getMessageIds() {
        readLock.lock();
        try {
            Set<String> set = new HashSet<String>();
            set.addAll(messageIdSet);
            return set;
        } finally {
            readLock.unlock();
        }
    }

    /**
     * @category 获取队列大小
     * @param
     * @author heyingcheng
     * @date 2018/4/23 1:38
     * @return int
     */
    public int size() {
        readLock.lock();
        try {
            return messageQueue.size();
        } finally {
            readLock.unlock();
        }
    }

    /**
     * @category 获取队列是否为空
     * @param
     * @author heyingcheng
     * @date 2018/4/23 1:45
     * @return boolean
     */
    public boolean isEmpty() {
        readLock.lock();
        try {
            return messageQueue.isEmpty();
        } finally {
            readLock.unlock();
        }
    }

}

